package cn.doitedu.etl_trash;

import com.alibaba.fastjson.JSON;
import org.apache.flink.runtime.state.hashmap.HashMapStateBackend;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.annotation.DataTypeHint;
import org.apache.flink.table.annotation.FunctionHint;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.functions.ScalarFunction;
import org.apache.flink.table.functions.TableFunction;
import org.apache.flink.table.types.logical.RawType;
import org.apache.flink.types.Row;

import java.util.HashMap;

/**
 * @Author: deep as the sea
 * @Site: <a href="www.51doit.com">多易教育</a>
 * @QQ: 657270652
 * @Date: 2022/12/9
 * @Desc:
 * 用户注册表信息，从业务库实时同步到hbase
 * 核心要点： 利用mysql-cdc-connector监听mysql的binlog来获取数据
 *  然后对数据稍作整理，利用hbase-connector写入hbase
 **/
public class SyncJob_UserInfoTable2Hbase {

    public static void main(String[] args) {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.enableCheckpointing(2000, CheckpointingMode.EXACTLY_ONCE);
        env.getCheckpointConfig().setCheckpointStorage("file:/d:/checkpoint");
        env.setParallelism(1);

        StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);

        // 创建业务库的 ums_member 表 的cdc连接器表
        // 挑选一些关心的测试用字段：  id,username,phone,status,create_time,gender,province,city,job
        tEnv.executeSql("CREATE TABLE ums_member_source (    " +
                "      username STRING,                             " +
                "      phone STRING,                                " +
                "      status int,                                  " +
                "      create_time timestamp(3),                    " +
                "      gender int,                                  " +
                "      birthday date,                               " +
                "      province STRING,                             " +
                "      city STRING,                                 " +
                "      job STRING ,                                 " +
                "      source_type INT ,                            " +
                "     PRIMARY KEY (username) NOT ENFORCED           " +
                "     ) WITH (                                 " +
                "     'connector' = 'mysql-cdc',               " +
                "     'hostname' = 'doitedu'   ,               " +
                "     'port' = '3306'          ,               " +
                "     'username' = 'root'      ,               " +
                "     'password' = 'root'      ,               " +
                "     'database-name' = 'realtimedw',          " +
                "     'table-name' = 'ums_member'              " +
                ")");


        // 创建hbase连接器sink表
        tEnv.executeSql("CREATE TABLE ums_member_hbasesink  " +
                " username STRING, " +
                " f ROW<q STRING>, " +
                " PRIMARY KEY (username) NOT ENFORCED " +
                ") WITH (                             " +
                " 'connector' = 'hbase-2.2',          " +
                " 'table-name' = 'dim_user_info',     " +
                " 'zookeeper.quorum' = 'doitedu:2181' " +
                ")");



        // 我们需要把上面的数据组装成 hbase目标表的存储格式：  username作为rowkey， 其他所有字段组装成json作为一个value
        // 而 flinksql中没有对应的函数可用（在1.16时有 JSON_OBJECT函数）
        // 我们可以自定义一个函数，并注册到tableEnv中
        tEnv.createTemporarySystemFunction("toJsonObject",ToJsonObject.class);


        // 将原始数据整理成 username -> json 的结构
        tEnv.executeSql(
                "CREATE TEMPORARY VIEW res_view AS SELECT         "+
                        " username ,                                       "+
                        " toJsonObject(                                    "+
                        "   'phone',phone,                                 "+
                        "   'status',cast(status as string),               "+
                        "   'create_time', cast(create_time as string),    "+
                        "   'gender', cast(gender as string),              "+
                        "   'province',cast(province as string),           "+
                        "   'city',cast(city as string),                   "+
                        "   'job' , cast(job as string)                    "+
                        " ) as q                                           "+
                        " FROM ums_member_source                           ");

        // 将整理好的数据，写入hbase表
        tEnv.executeSql("insert into ums_member_hbasesink select username,ROW(q) from res_view");


    }

    // 组装json的自定义函数
    public static class ToJsonObject extends ScalarFunction{
        public String eval(String... eles){
            if(eles.length % 2 != 0) throw new RuntimeException("输入的参数个数必须为偶数");
            // 将输入参数按两个一对，放入hashmap
            HashMap<String, Object> datas = new HashMap<>();
            for(int i = 0; i<eles.length; i=i+2){
                datas.put(eles[i].toString(),eles[i+1]);
            }
            // 将放好数据的hashmap，转成json返回
            return JSON.toJSONString(datas);
        }
    }

}
